feat: add CancellationToken with collect/executeStream overloads#69
Open
LantaoJin wants to merge 2 commits into
Open
feat: add CancellationToken with collect/executeStream overloads#69LantaoJin wants to merge 2 commits into
LantaoJin wants to merge 2 commits into
Conversation
Tokens are session-allocated, fired from any thread, and abort the next cooperative poll point of the targeted call. Pre-stream cancellations surface as `java.util.concurrent.CancellationException`; mid-stream cancellations from `executeStream(..., token)` surface from `ArrowReader.loadNextBatch` as `IOException` whose message contains `"query cancelled"` (the Arrow C-data wrapper hides the typed signal). Native handles are opaque registry IDs guarded by an `AtomicLong` on the Java side, so concurrent close + cancel is safe: the worst race outcome is a clean `IllegalStateException` from a missing-ID lookup rather than a use-after-free of a freed `Box`.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
CancellationTokenonSessionContext#68 .Rationale for this change
A long-running
DataFrame.collect(allocator)orDataFrame.executeStream(allocator)call blocks the calling Java thread for the full duration of the query.Thread.interrupt()is a no-op (the thread is parked insideruntime().block_on(...)), and there is no way to free the in-flight query's native resources early. For any embedder running request timeouts, user-cancel actions, or node-shutdown drains, this is a hard operational gap.What changes are included in this PR?
The cancellation we ship lives on the session, not on the DataFrame. A token is allocated by the session, passed to
collect/executeStream, and fired from any thread. We deliberately do not addDataFrame.cancel(): aDataFrameis a lazy plan that can be executed concurrently from multiple threads, so a per-DataFrame cancel verb has ambiguous semantics. The token is the primitive; a Spark-stylectx.addTag(name)/ctx.cancelTag(name)sugar layer can land in a follow-up.Native shape:
tokio_util::sync::CancellationTokenper token, owned by a process-global registry keyed by an opaqueu64ID. JNI handlers look up by ID under aMutex<HashMap>and clone theArcout before any blocking work. The clonedArckeeps the inner token alive for the borrow's lifetime, so an in-flightcollect()future that already holds a clone keeps working through a concurrent close.close()removes the registry entry; the underlying token drops only when the lastArcclone goes away.Race safety:
CancellationToken.nativeHandleisfinal AtomicLong.cancel()/isCancelled()read viaget();close()usesgetAndSet(0L)so only the winning thread issuescloseToken. Combined with the registry on the native side, a concurrent close + cancel + query-start race produces at worst a cleanIllegalStateExceptionfrom a missing-ID lookup -- never a use-after-free of a freedBox. The registry pattern is the same scaffolding upstream issue #40 calls for across all handle types; cancellation tokens get it first because they are designed to be fired from a thread that does not own them, so the race window is the widest of any handle.Are these changes tested?
Yes -- 19 new tests across
CancellationTokenTestandDataFrameCancellationTest.Are there any user-facing changes?
Yes -- purely additive. New
CancellationTokenclass, one new method onSessionContext, two new overloads onDataFrame. No API removals, no deprecations, no behaviour change for existing callers. New Cargo dependency:tokio-util = { version = "0.7", features = ["rt"] }; already a transitive dep, so no new crate in the build, just a feature flag added.